{--
    Support for concurrency.
    
    Concurrency in Frege comes in 2 flavors. 
    The first is through 'Thread's, which are,
    unlike in Haskell, _OS threads_.
  
    The second possibility is to use a thread pool and an executor service 
    one can submit tasks to. But note that blocking asynchronous tasks,
    unlike Haskell green threads, will block an OS thread on blocking actions.  
-}    

module frege.control.Concurrent where

import frege.java.util.Concurrent as C

--- A thread safe, shared variable, that is either full or empty.
--- Technically, this is just a 'BlockingQueue' restricted to length 1.    
abstract newtype MVar a  =  MV (MutableIO (BlockingQueue a)) where
    
    --- create an empty 'MVar'
    newEmpty        = ArrayBlockingQueue.new 1 >>= return . MV
    
    --- create a 'MVar' filled with a value
    new a           = do m <- newEmpty; m.put a; return m
    
    --- put a value in a 'MVar', blocks if full
    put   (MV q) a  = q.put a
    
    --- take a value from a 'MVar', blocks if empty         
    take  (MV q)    = q.take
    
    --- put a value in a 'MVar', returns false if already full.
    offer (MV q) a  = q.offer a
    
    --- get the value from a 'MVar', return 'Nothing' when empty
    poll  (MV q)    = q.poll
    
    --- like 'poll', but waits a specified number of milliseconds for a value to become available
    wait  (MV q) n  = q.poll n TimeUnit.milliSeconds   
    
-- Haskell compatibility
newEmptyMVar    = MVar.newEmpty
newMVar         = MVar.new
takeMVar        = MVar.take
putMVar         = MVar.put 
tryTakeMVar     = MVar.poll
tryPutMVar      = MVar.offer   


--- Create and start a new OS 'Thread' that runs an 'IO' action. 
forkOS :: IO () -> IOMutable Thread
forkOS action = do
    r <- Runnable.new action
    t <- Thread.new r
    t.start
    return t

--- Run the 'IO' action asynchronously in an 'ExecutorService'
--- This is not suitable for not-ending processes!
--- The executor service may manage a fixed small number of concurrent threads only.
forkIO :: IO () -> IO ()
forkIO action = do
    service <- ExecutorService.executorService ()
    Runnable.new action >>= service.submit

--- Shutdwon the 'ExecutorService'
shutdown = ExecutorService.executorService () >>= _.shutdown

--- Run a 'IO' action asynchronously and return the result in a 'MVar'
async :: IO a -> IO (MVar (Exception | a))
async action = do
        service <- ExecutorService.executorService ()
        mvar <- newEmptyMVar
        let embedded = do
                action >>=    mvar.put . Right
                    `catch`   mvar.put . Left
                    `finally` mvar.offer (Left (error "async: no end?")) >> return ()
        Runnable.new embedded >>= service.submit
        return mvar 
